package com.deepglint.window;

import com.deepglint.beans.SensorReading;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.IterableUtils;

/**
 * @author mj
 * @version 1.0
 * @date 2021-11-18 22:20
 */
public class WindowTest_CountWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<String> streamSource = env.socketTextStream("192.168.150.128", 7777);

        DataStream<SensorReading> streamMap = streamSource.map((MapFunction<String, SensorReading>) value -> {
            String[] split = value.split(",");
            return new SensorReading(split[0], split[1], new Long(split[2]), new Double(split[3]));
        });

        streamMap.print();

        // countWindow
        DataStream<Tuple3<Double, Double, Integer>> resultSteam = streamMap.keyBy("id")
                .countWindow(5, 2)
                .aggregate(new MyAggregateFunction());

        resultSteam.print();

        env.execute();
    }

    static class MyAggregateFunction implements AggregateFunction<SensorReading, Tuple2<Double, Integer>, Tuple3<Double, Double, Integer>> {

        @Override
        public Tuple2<Double, Integer> createAccumulator() {
            return new Tuple2<>(0.0, 0);
        }

        @Override
        public Tuple2<Double, Integer> add(SensorReading value, Tuple2<Double, Integer> accumulator) {
            return new Tuple2<>(accumulator.f0 + value.getTemperature(), accumulator.f1 + 1);
        }

        @Override
        public Tuple3<Double, Double, Integer> getResult(Tuple2<Double, Integer> accumulator) {
            return new Tuple3<>(accumulator.f0 / accumulator.f1, accumulator.f0, accumulator.f1);
        }

        @Override
        public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
            return new Tuple2<>(a.f0 + b.f0, b.f1 + b.f1);
        }
    }
}
